package com.skyline.courier.mq.provider.rabbitmq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.skyline.courier.mq.ExchangeType;
import com.skyline.courier.mq.MessageHandleServer;
import com.skyline.courier.mq.MessageHandler;
import com.skyline.courier.mq.NamedThreadFactory;
import com.skyline.courier.mq.Protocol;

/**
 * RabbitMQ消息处理服务端
 * 
 * @author wuqh
 * 
 */
public class RabbitMessageHandleServer implements MessageHandleServer {
	private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMessageHandleServer.class);
	private static final ExchangeType DEFAULT_EXCHANGE_TYPE = ExchangeType.DIRECT;
	private static final int DEFAULT_POOL_SIZE = 1;
	private static final boolean DEFAULT_AUTO_ACK = true;

	private String hosts;
	private RabbitChannelFactory channelFactory;
	private String exchangeName;
	private ExchangeType exchangeType = DEFAULT_EXCHANGE_TYPE;

	private boolean autoAck = DEFAULT_AUTO_ACK;
	private int poolsize = DEFAULT_POOL_SIZE;
	private String queueName;
	private int workThreads = (Runtime.getRuntime().availableProcessors() * 2);

	private volatile boolean serverRunning = false;

	private Map<String, RabbitMessageProcessor> processors = new HashMap<String, RabbitMessageHandleServer.RabbitMessageProcessor>();

	@Override
	public void setExchangeType(ExchangeType exchangeType) {
		this.exchangeType = exchangeType;
	}

	@Override
	public void setExchangeName(String exchangeName) {
		this.exchangeName = exchangeName;
	}

	@Override
	public void setHost(String hosts) {
		this.hosts = hosts;
	}
	
	@Override
	public void setWorkThreads(int workThreads) {
		this.workThreads = workThreads;
	}

	@Override
	public synchronized void start() {
		for (RabbitMessageProcessor processor : processors.values()) {
			processor.start();
		}
		serverRunning = true;
	}

	@Override
	public synchronized void shutdown() {
		for (RabbitMessageProcessor processor : processors.values()) {
			processor.destroy();
		}
		channelFactory.destroy();
		serverRunning = false;
	}

	@Override
	public synchronized void registerMessageHandler(String key, MessageHandler handler, Protocol<?, byte[]> protocol) {
		RabbitMessageProcessor processor = null;
		if (processors.containsKey(key)) {
			LOGGER.info("替换在[" + key + "]上的消息处理器");
			processor = processors.get(key);
			processor.bindRoutingKey = key;
			processor.handler = new DeliveryHandler(handler, protocol);
		} else {
			processor = new RabbitMessageProcessor();
			processor.bindRoutingKey = key;
			processor.handler = new DeliveryHandler(handler, protocol);
			processors.put(key, processor);
		}

		if (serverRunning) {
			processor.start();
		}
	}

	public void setChannelFactory(RabbitChannelFactory channelFactory) {
		this.channelFactory = channelFactory;
	}

	public void setAutoAck(boolean autoAck) {
		this.autoAck = autoAck;
	}

	public void setPoolsize(int poolsize) {
		this.poolsize = poolsize;
	}

	public void setQueueName(String queueName) {
		this.queueName = queueName;
	}

	private RabbitChannelFactory createChannelFactory() {
		if (channelFactory != null) {
			return channelFactory;
		}

		RabbitConnectionFactory connectionFactory = new RabbitConnectionFactory();
		connectionFactory.setHosts(hosts);

		channelFactory = new RabbitChannelFactory();
		channelFactory.setConnectionFactory(connectionFactory);

		return channelFactory;

	}

	/**
	 * RabbitMQ消息处理器
	 * 
	 * @author wuqh
	 * 
	 */
	public class RabbitMessageProcessor implements ShutdownListener {
		private static final long ONE_SECOND = 1000L;

		private DeliveryHandler handler;

		private String bindRoutingKey;

		private Channel channel;
		private volatile QueueingConsumer consumer;
		private volatile ExecutorService workerExecutor;
		private volatile boolean running = false;

		private void start() {
			startConsumer();
			
			startWorker();
		}

		private void startWorker() {
			if(workerExecutor != null) {
				workerExecutor.shutdown();
			}
			workerExecutor = Executors.newFixedThreadPool(workThreads, new NamedThreadFactory("MQServerWorker"));
			running = true;
			workerExecutor.execute(new Worker());
		}

		private void destroy() {
			running = false;
			channel = null;
		}

		private void startConsumer() {
			if (channel == null || !channel.isOpen()) {
				try {
					channel = createChannelFactory().createChannel();
					channel.getConnection().addShutdownListener(this);

					String internalQueueName;
					if (queueName == null) {
						internalQueueName = channel.queueDeclare().getQueue();
					} else {
						internalQueueName = channel.queueDeclare(queueName, false, true, true, null).getQueue();
					}
					channel.exchangeDeclare(exchangeName, exchangeType.toString());
					channel.queueBind(internalQueueName, exchangeName, bindRoutingKey);

					consumer = new QueueingConsumer(channel);
					for (int i = 1; i <= poolsize; i++) {
						channel.basicConsume(internalQueueName, autoAck, consumer);
						if (LOGGER.isInfoEnabled()) {
							LOGGER.info("启动第[" + i + "]个消费者，exchange [" + exchangeName + "][" + exchangeType
									+ "]， queue [" + queueName + "] - routingKey [" + bindRoutingKey + "]");
						}
					}
				} catch (IOException e) {
					LOGGER.warn("Unable start consumer", e);
				}
			}
		}

		@Override
		public void shutdownCompleted(ShutdownSignalException cause) {
			if (LOGGER.isInfoEnabled()) {
				LOGGER.info("Channel连接丢失，原因 [" + cause.getReason() + "]");
				LOGGER.info("Reference [" + cause.getReference() + "]");
			}

			if (cause.isInitiatedByApplication()) {
				if (LOGGER.isInfoEnabled()) {
					LOGGER.info("被应用关闭");
				}
			} else if (cause.isHardError()) {
				LOGGER.error("由于硬件问题关闭Channel，尝试进行重新连接...");
				startConsumer();
			}
		}

		public void handleDelivery(Delivery delivery) {
			if (handler != null) {
				handler.handleMessage(channel, delivery);
			}
		}

		private final class Worker implements Runnable {

			@Override
			public void run() {
				while (running) {
					try {
						Delivery delivery = (consumer == null ? null : consumer.nextDelivery(ONE_SECOND));

						if (delivery != null) {
							handleDelivery(delivery);
						}
					} catch (InterruptedException ie) {
						if (LOGGER.isDebugEnabled()) {
							LOGGER.debug("等待获取消息发生异常");
						}
					} catch (Exception e) {
						LOGGER.error("发生消息发生异常", e);
					}
				}
			}
		}

	}

}
